其他
这可能是全网Spring Cloud Gateway限流最完整的方案了!
本文来源:http://rrd.me/g6P3V
Built on Spring Framework 5, Project Reactor and Spring Boot 2.0 Able to match routes on any request attribute Predicates and filters are specific to routes Hystrix Circuit Breaker integration Spring Cloud DiscoveryClient integration Easy to write Predicates and Filters Request Rate Limiting Path Rewriting
一、常见的限流场景
1.1 限流的对象
限制某个接口一分钟内最多请求 100 次 限制某个用户的下载速度最多 100KB/S 限制某个用户同时只能对某个接口发起 5 路请求 限制某个 IP 来源禁止访问任何请求
1.2 限流的处理方式
拒绝服务 排队等待 服务降级
1.3 限流的架构
二、常见的限流算法
2.1 固定窗口算法(Fixed Window)
AtomicLong
、LongAdder
或 Semaphore
来实现计数,而在分布式场景下可以通过 Redis 的 INCR
和 EXPIRE
等命令并结合 EVAL
或 lua 脚本来实现,Redis 官网提供了几种简单的实现方式。无论是请求频率限流还是并发量限流都可以使用这个算法。2.2 滑动窗口算法(Rolling Window 或 Sliding Window)
2.3 漏桶算法(Leaky Bucket)
2.4 令牌桶算法(Token Bucket)
生成令牌:假设有一个装令牌的桶,最多能装 M 个,然后按某个固定的速度(每秒 r 个)往桶中放入令牌,桶满时不再放入; 消费令牌:我们的每次请求都需要从桶中拿一个令牌才能放行,当桶中没有令牌时即触发限流,这时可以将请求放入一个缓冲队列中排队等待,或者直接拒绝;
capacity
表示令牌桶大小,refillTokensPerOneMillis
表示填充速度,每毫秒填充多少个,availableTokens
表示令牌桶中还剩多少个令牌,lastRefillTimestamp
表示上一次填充时间。private final long capacity;
private final double refillTokensPerOneMillis;
private double availableTokens;
private long lastRefillTimestamp;
public TokenBucket(long capacity, long refillTokens, long refillPeriodMillis) {
this.capacity = capacity;
this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
synchronized public boolean tryConsume(int numberTokens) {
refill();
if (availableTokens < numberTokens) {
return false;
} else {
availableTokens -= numberTokens;
return true;
}
}
private void refill() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > lastRefillTimestamp) {
long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp;
double refill = millisSinceLastRefill * refillTokensPerOneMillis;
this.availableTokens = Math.min(capacity, availableTokens + refill);
this.lastRefillTimestamp = currentTimeMillis;
}
}
}
refill()
这个方法,在每次消费令牌时,计算当前时间和上一次填充的时间差,并根据填充速度计算出应该填充多少令牌。在重新填充令牌后,再判断请求的令牌数是否足够,如果不够,返回 false,如果足够,则减去令牌数,并返回 true。三、一些开源项目
3.1 Guava 的 RateLimiter
RateLimiter.create(5)
表示这个限流器容量为 5,并且每秒生成 5 个令牌,也就是每隔 200 毫秒生成一个。我们可以使用 limiter.acquire()
消费令牌,如果桶中令牌足够,返回 0,如果令牌不足,则阻塞等待,并返回等待的时间。我们连续请求几次:SmoothBursty
还具有应对突发的能力,而且 还允许消费未来的令牌,比如下面的例子:System.out.println(limiter.acquire(10));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
3.2 Bucket4j
Bucket Bandwidth Refill
Bucket
接口代表了令牌桶的具体实现,也是我们操作的入口。它提供了诸如 tryConsume
和 tryConsumeAndReturnRemaining
这样的方法供我们消费令牌。可以通过下面的构造方法来创建 Bucket
:if(bucket.tryConsume(1)) {System.out.println("ok");} else {
System.out.println("error");
}
Bandwidth
的意思是带宽,可以理解为限流的规则。Bucket4j 提供了两种方法来创建 Bandwidth:simple
和 classic
。下面是 simple 方式创建的 Bandwidth,表示桶大小为 10,填充速度为每分钟 10 个令牌:Bandwidth limit = Bandwidth.classic(10, filler);
Refill
用于填充令牌桶,可以通过它定义填充速度,Bucket4j 有两种填充令牌的策略:间隔策略(intervally) 和 贪婪策略(greedy)。在上面的例子中我们使用的是贪婪策略,如果使用间隔策略可以像下面这样创建 Refill
:基于令牌桶算法 高性能,无锁实现 不存在精度问题,所有计算都是基于整型的 支持通过符合 JCache API 规范的分布式缓存系统实现分布式限流 支持为每个 Bucket 设置多个 Bandwidth 支持同步和异步 API 支持可插拔的监听 API,用于集成监控和日志 不仅可以用于限流,还可以用于简单的调度
3.3 Resilience4j
BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitTime(100)
.build();
Bulkhead bulkhead = Bulkhead.of("backendName", bulkheadConfig);
// 创建一个 RateLimiter,每秒允许一次请求
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()
.timeoutDuration(Duration.ofMillis(100))
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build();
RateLimiter rateLimiter = RateLimiter.of("backendName", rateLimiterConfig);
// 使用 Bulkhead 和 RateLimiter 装饰业务逻辑
Supplier<String> supplier = () -> backendService.doSomething();
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
.withBulkhead(bulkhead)
.withRateLimiter(rateLimiter)
.decorate();
// 调用业务逻辑
Try<String> try = Try.ofSupplier(decoratedSupplier);
assertThat(try.isSuccess()).isTrue();
3.4 其他
https://github.com/mokies/ratelimitj https://github.com/wangzheng0822/ratelimiter4j https://github.com/wukq/rate-limiter https://github.com/marcosbarbero/spring-cloud-zuul-ratelimit https://github.com/onblog/SnowJena https://gitee.com/zhanghaiyang/spring-boot-starter-current-limiting https://github.com/Netflix/concurrency-limits
四、在网关中实现限流
Request Rate Limiting
,说明网关自带了限流的功能,但是 Spring Cloud Gateway 自带的限流有很多限制,譬如不支持单机限流,不支持并发量限流,而且它的请求频率限流也是不尽人意,这些都需要我们自己动手来解决。4.1 实现单机请求频率限流
RateLimiter
,如下:Mono<RateLimiter.Response> isAllowed(String routeId, String id);
}
isAllowed
,第一个参数 routeId
表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id
表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange
中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory
中对 isAllowed
的调用逻辑:public GatewayFilter apply(Config config) {
// 从配置中得到 KeyResolver
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
// 从配置中得到 RateLimiter
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)
.flatMap(key -> {
// 通过 KeyResolver 得到 key,作为唯一标识 id 传入 isAllowed() 方法
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
// 获取当前路由 ID,作为 routeId 参数传入 isAllowed() 方法
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange
.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders()
.entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(),
header.getValue());
}
// 请求允许,直接走到下一个 filter
if (response.isAllowed()) {
return chain.filter(exchange);
}
// 请求被限流,返回设置的 HTTP 状态码(默认是 429)
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
}
KeyResolver
接口的 resolve
方法就可以自定义要限流的对象了。Mono<String> resolve(ServerWebExchange exchange);
}
HostAddrKeyResolver
可以根据 IP 来限流:@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
RateLimiter
接口只提供了一个实现类 RedisRateLimiter
:Resilience4j
实现的:Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How many seconds for a token refresh?
int refreshPeriod = routeConfig.getRefreshPeriod();
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry
.ofDefaults()
.rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));
final boolean allowed = rateLimiter.acquirePermission(requestedTokens);
final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
return Mono.just(response);
}
Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How much bursting do you want to allow?
int burstCapacity = routeConfig.getBurstCapacity();
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
final Bucket bucket = bucketMap.computeIfAbsent(id,
(key) -> createBucket(replenishRate, burstCapacity));
final boolean allowed = bucket.tryConsume(requestedTokens);
Response response = new Response(allowed,
getHeaders(routeConfig, bucket.getAvailableTokens()));
return Mono.just(response);
}
4.2 实现分布式请求频率限流
RedisRateLimiter
,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts
目录下找到该脚本文件 request_rate_limiter.lua
:local timestamp_key = KEYS[2]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
return { allowed_num, new_tokens }
cloud:
gateway:
routes:
- id: test
uri: http://httpbin.org:80/get
filters:
- name: RequestRateLimiter
args:
key-resolver: '#{@hostAddrKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 3
key-resolver
使用 SpEL 表达式 #{@beanName}
从 Spring 容器中获取 hostAddrKeyResolver
对象,burstCapacity
表示令牌桶的大小,replenishRate
表示每秒往桶中填充多少个令牌,也就是填充速度。public RouteLocator myRoutes(RouteLocatorBuilder builder) {
return builder.routes()
.route(p -> p
.path("/get")
.filters(filter -> filter.requestRateLimiter()
.rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
.uri("http://httpbin.org:80"))
.build();
}
4.3 实现单机并发量限流
SemaphoreBulkhead
和 ThreadPoolBulkhead
,这也正是舱壁模式常见的两种实现方案:一种是带计数的信号量,一种是固定大小的线程池。考虑到多线程场景下的线程切换成本,默认推荐使用信号量。private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Request processing ...");
semaphore.release();
} catch (InterruptedException e) {
e.printStack();
}
}
});
}
threadPool.shutdown();
}
}
Semaphore
还有很多类也可以用作计数,比如 AtomicLong
或 LongAdder
,这在并发量限流中非常常见,只是无法提供像信号量那样的阻塞能力:private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
private static AtomicLong atomic = new AtomicLong();
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
if(atomic.incrementAndGet() > 10) {
System.out.println("Request rejected ...");
return;
}
System.out.println("Request processing ...");
atomic.decrementAndGet();
} catch (InterruptedException e) {
e.printStack();
}
}
});
}
threadPool.shutdown();
}
}
4.4 实现分布式并发量限流
IgniteSemaphore
用于创建分布式信号量,它的使用方式和 Semaphore
非常类似,参考这里。使用 Redis 的 ZSet 也可以实现分布式信号量,比如 这篇博客介绍的方法,还有《Redis in Action》这本电子书中也提到了这样的例子,教你如何实现 Counting semaphores。另外,Redisson 也实现了基于 Redis 的分布式信号量 RSemaphore,用法也和 Semaphore
类似。使用分布式信号量可以很容易实现分布式并发量限流,实现方式和上面的单机并发量限流几乎是一样的。INCR
就很容易实现,更有甚者,使用 MySQL 数据库也可以实现。只不过使用计数器要注意操作的原子性,每次请求时都要经过这三步操作:取计数器当前的值、判断是否超过阈值,超过则拒绝、将计数器的值自增。这其实和信号量的 P 操作是一样的,而释放就对应 V 操作。System.out.println("Request processing ...");
semaphore.release();
semaphore.acquire();
System.out.println("Request processing ...");
} catch (InterruptedException e) {
e.printStack();
} finally {
semaphore.release();
}
requests_xxx
(xxx 为请求 ID),value 为 1,并给这个 key 设置一个 TTL(如果你的应用中存在耗时非常长的请求,譬如对于一些 WebSockket 请求可能会持续几个小时,还需要开一个线程定期去刷新这个 key 的 TTL)。然后在判断并发量时,使用 KEYS
命令查询 requests_*
开头的 key 的个数,就可以知道当前一共有多少个请求,如果超过并发量上限则拒绝请求。这种方法可以很好的应对服务崩溃或重启的问题,由于每个 key 都设置了 TTL,所以经过一段时间后,这些 key 就会自动消失,就不会出现信号量占满不释放的情况了。但是这里使用 KEYS
命令查询请求个数是一个非常低效的做法,在请求量比较多的情况下,网关的性能会受到严重影响。我们可以把 KEYS
命令换成 SCAN
,性能会得到些许提升,但总体来说效果还是很不理想的。instances_xxx
(xxx 为实例 ID),value 为这个实例当前的并发量。同样的,我们为这个 key 设置一个 TTL,并且开启一个线程定期去刷新这个 TTL。每接受一个请求后,计数器加一,请求结束,计数器减一,这和单机场景下的处理方式一样,只不过在判断并发量时,还是需要使用 KEYS
或 SCAN
获取所有的实例,并计算出并发量的总和。不过由于实例个数是有限的,性能比之前的做法有了明显的提升。202009051130
这样的一个 key,value 为计数器,表示请求的数量。当接受一个请求后,在当前的时间窗口中加一,当请求结束,在当前的时间窗口中减一,注意,接受请求和请求结束的时间窗口可能不是同一个。另外,我们还需要一个本地列表来记录当前实例正在处理的所有请求和请求对应的时间窗口,并通过一个小于时间窗口的定时线程(如 30 秒)来迁移过期的请求,所谓过期,指的是请求的时间窗口和当前时间窗口不一致。那么具体如何迁移呢?我们首先需要统计列表中一共有多少请求过期了,然后将列表中的过期请求时间更新为当前时间窗口,并从 Redis 中上一个时间窗口移动相应数量到当前时间窗口,也就是上一个时间窗口减 X,当前时间窗口加 X。由于迁移线程定期执行,所以过期的请求总是会被移动到当前窗口,最终 Redis 中只有当前时间窗口和上个时间窗口这两个时间窗口中有数据,再早一点的窗口时间中的数据会被往后迁移,所以可以给这个 key 设置一个 3 分钟或 5 分钟的 TTL。判断并发量时,由于只有两个 key,只需要使用 MGET
获取两个值相加即可。下面的流程图详细描述了算法的运行过程:请求结束时,直接在 Redis 中当前时间窗口减一即可,就算是负数也没关系。请求列表中的该请求不用急着删除,可以打上结束标记,在迁移线程中统一删除(当然,如果请求的开始时间和结束时间在同一个窗口,可以直接删除); 迁移的时间间隔要小于时间窗口,一般设置为 30s; Redis 中的 key 一定要设置 TTL,时间至少为 2 个时间窗口,一般设置为 3 分钟; 迁移过程涉及到“从上一个时间窗口减”和“在当前时间窗口加”两个操作,要注意操作的原子性; 获取当前并发量可以通过 MGET
一次性读取两个时间窗口的值,不用GET
两次;获取并发量和判断并发量是否超限,这个过程也要注意操作的原子性。
总结
Keycloak Spring Security适配器的常用配置
如果你喜欢本文,欢迎关注我们
专注分享关于Spring的一切
关注我,加入Spring技术交流群